Image Retrieval Project

Author

Steven Ndung’u

Published

August 29, 2024


Image Retrieval by Hashing based COSFIRE Descriptors Approach


Introduction

This work develops a compact image hash code learning framework based on the COSFIRE filter banks for efficient similarity search and retrieval. Images are first passed through class-specific COSFIRE filters designed to activate on visually discriminative patterns. For comprehensive details on COSFIRE filters, we refer the reader to (Ndung’u et al. 2024) and (Azzopardi and Petkov 2012). These feature vectors are input to a multi-layer perceptron (MLP) network to learn binary hash codes that should capture the semantic similarity of the images, enabling efficient matching of hash codes of database images for fast retrieval. Our experiments on an image dataset demonstrate the potential of this straightforward approach for developing compact hash codes based rotation-invariant COSFIRE descriptors.

Tip

MLP is a type of artificial neural network consisting of multiple layers of neurons. The neurons in the MLP typically use nonlinear activation functions, allowing the network to learn complex patterns in data.

Data description

The input data consists of a set of descriptors extracted for each image using COSFIRE filters. Specifically, 93 COSFIRE filters are designed for each image class. From the work of (Ndung’u et al. 2024), it was discovered that the best accuracy required only 372 COSFIRE filters per class on the validation set (4 \(\times\) 93 descriptors). Therefore, we used the descriptors from the 93 COSFIRE filters per class as the basis for our image hashing experiments.

This process is applied to every image, resulting in a dataframe where each row contains the 372-element descriptor vector corresponding to an image. So each image is represented by a n-dimensional vector (n=372 in this case) of COSFIRE filter response values, which encode visual characteristics that help differentiate between classes.

Tip

The dataframe stores these image descriptor vectors, with each row representing a single image and each column representing the maximum response from one of the 372 total COSFIRE filters applied. This serves as the input data capturing image features that will be further transformed into a k-bit hash code for efficient similarity search and retrieval. The compact hash representation helps quickly locate the most similar images from the database given a new query image.

Code
import os
os.environ['PYTHONHASHSEED'] = 'python_envn'
from scipy import stats

from IPython.display import display, Markdown, HTML
from itables import init_notebook_mode
init_notebook_mode(all_interactive=True)
from itables import show

import torch.nn as nn

import plotly.express as px
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
pio.renderers.default = "notebook"

import pandas as pd
import numpy as np
import re
from scipy import stats

import seaborn as sns
import matplotlib.pyplot as plt
This is the init_notebook_mode cell from ITables v2.0.1
(you should not see this message - is your notebook trusted?)

Preview the data:

Code
#$Env:QUARTO_PYTHON = "C:\Users\P307791\Anaconda3\python.exe"
from cosfire_workflow_utils_12082024 import *
import json
from tqdm import tqdm
num = 1
# margin = 36
# alpha = 0.001
# epochs = 100
# batch_size = 32
# learning_rate = 0.01
# bitsize = 36
input_size = 372

data_path = f"./descriptor_set_{num}_train_valid_test.mat" # Path to the Train_valid_test.mat file
data_path_valid = f"descriptor_set_{num}_train_valid.mat" # Path to the Train_valid.mat file
data_path_test = f"descriptor_set_{num}_train_test.mat" # Path to the Train_test.mat file

output_dir = f'output{num}' 

if not os.path.exists(output_dir):
    os.mkdir(output_dir)
else:

    pass
    #print(f"The directory {output_dir} already exists.")

#print(output_dir)

train_df,valid_df,test_df = get_and_check_data_prev(data_path,data_path_valid,data_path_test,dic_labels) 

data_preview = pd.concat([train_df.iloc[:, :10], train_df[['label_code']]], axis=1).head(10)
data_preview.columns = ['descrip_1', 'descrip_2', 'descrip_3', 'descrip_4', 'descrip_5',
       'descrip_6', 'descrip_7', 'descrip_8', 'descrip_9', 'descrip_10',
       'galaxy type']



display(Markdown(data_preview.to_markdown(index = True)))
descrip_1 descrip_2 descrip_3 descrip_4 descrip_5 descrip_6 descrip_7 descrip_8 descrip_9 descrip_10 galaxy type
0 0.0143011 0.0179688 0.0262467 0.0197133 0.0132749 0.0256719 0.0154136 0.0203668 0.0182121 0.0208825 0
1 0.00594451 0.0210936 0.0403211 0.0309183 0.00594909 0.0403366 0.0142293 0.0196551 0.0306603 0.0287716 0
2 0.00819936 0.0168289 0.0360158 0.0262702 0.00644101 0.0369505 0.0136889 0.0206689 0.0274735 0.0247877 0
3 0.0117083 0.0222082 0.0382847 0.0288466 0.0136789 0.0352442 0.0205771 0.0220805 0.0275772 0.0279186 0
4 0.0187394 0.0309713 0.0524503 0.0381965 0.0199479 0.0439544 0.0283139 0.0282423 0.0342867 0.0377899 0
5 0.0102484 0.0209022 0.0402256 0.0286324 0.0127513 0.0383189 0.0206139 0.0237921 0.0306305 0.0294629 0
6 0.0148918 0.0312111 0.0395833 0.0345283 0.0139807 0.0307287 0.0283663 0.0300278 0.0262398 0.0333521 0
7 0.0097114 0.0292753 0.0468838 0.0378348 0.00522875 0.040914 0.0223855 0.025033 0.0335629 0.0359013 0
8 0.0124266 0.0221114 0.0330158 0.0266443 0.00985906 0.0286542 0.0177679 0.0196203 0.0218583 0.0252806 0
9 0.0115169 0.0275402 0.0472276 0.0363543 0.0114842 0.0397568 0.0256311 0.026436 0.0318334 0.0352658 0

Model Training

Loss function

Let \(\Omega\) represent the COSFIRE descriptor embedding space for radio galaxy images. The objective is to discover a mapping function \(F : \Omega → {+1, −1}^{k}\) that translates the embedding space to a k-bit binary code space. This mapping should be learned in such a way that visually or semantically similar radio galaxy images are assigned binary codes that are close to each other, while dissimilar images are mapped to binary codes that are far apart in the binary code space.

\[\begin{aligned} L_r\left(\mathbf{b}_1, \mathbf{b}_2, y\right) & =\frac{1}{2}(1-y)\left\|\mathbf{b}_1-\mathbf{b}_2\right\|_2^2 \\ & +\frac{1}{2} y \max \left(m-\left\|\mathbf{b}_1-\mathbf{b}_2\right\|_2^2, 0\right) \\ & +\alpha\left(\left\|\left|\mathbf{b}_1\right|-\mathbf{1}\right\|_1+\left\|\left|\mathbf{b}_2\right|-\mathbf{1}\right\|_1\right) \end{aligned}\]

where \(D_h(· , ·)\) denotes the Hamming distance between two binary vectors, and m > 0 is a margin threshold parameter.

In this loss function: y = 0 if they are similar, and y = 1 otherwise

Note
  • The first term encourages similar pairs to have small distances - punishes similar images mapped to different binary codes.

  • The second term  encourages dissimilar pairs to have distances greater than the margin m punishes dissimilar images mapped to close binary codes when their Hamming distance falls below the margin threshold m. Only those dissimilar pairs having their distance within a radius are eligible to contribute to the loss function.

Suppose that there are N training pairs randomly selected from the training images \({(I_i,1, I_i,2, y_i)|i = 1, ..., N}\), our goal is to minimize the overall loss function:

\[\begin{gathered} \mathcal{L}=\sum_{i=1}^N L\left(\mathbf{b}_{i, 1}, \mathbf{b}_{i, 2}, y_i\right) \\ \text { s.t. } \mathbf{b}_{i, j} \in\{+1,-1\}^k, i \in\{1, \ldots, N\}, j \in\{1,2\} \end{gathered}\]
Tip

Regularization: To reduce the discrepancy between Euclidean space and the Hamming space a commonly used relaxation scheme is to utilize sigmoid or tanh function to approximate the thresholding procedure. A regularizer is applied to help obtain real-valued network outputs (from sigmoid/tanh/relu etc) to approach the desired discrete binary-like values (e.g 0,1).

def DSHLoss(u, y,alpha, margin):
      # m > 0 is a margin. The margin defines a radius around X1,X2. Dissimilar pairs contribute to the loss
    # function only if their distance is within this radius
    m = 2 * margin  
    y = y.int()
    # Create a duplicate y_label to form an N X N matrix
    #aim: # y = 0 if they are similar, and y = 1 otherwise 
    y = y.unsqueeze(1).expand(len(y),len(y))
    y_label = torch.ones_like(torch.empty(len(y), len(y)))
    y_label[y == y.t()] = 0

    dist = torch.cdist(u, u, p=2).pow(2)
    loss1 = 0.5 * (1 - y_label) * dist
    loss2 = 0.5 * y_label * torch.clamp(m - dist, min=0)

    B1 = torch.norm(torch.abs(u) - 1, p=1, dim=1)
    # create an N X N matrix to help in creating the pairs in the subsequent step  
    B2 = B1.unsqueeze(1).expand(len(y), len(y))
    #add across the pairs - a transpose is required in order to have pair additions.
    loss3 = (B2 + B2.t()) * alpha
    minibatch_loss = torch.mean(loss1 + loss2 + loss3)
    return minibatch_loss

Model

We train a simple MLP network architecture. By optimizing this loss, the neural network learns to transform the input into a latent feature space that accentuates similarities and differences critical for distinguishing images effectively. The resulting model provides an end-to-end learning pipeline from raw image inputs to a k(36)-bit compact hash code amenable for efficient image retrieval and matching. Our experiments demonstrate the potential of this straightforward architecture and training approach for image hashing.

The model is built by creating a sequential stack of linear layers, batch normalization layers, and activation functions. It takes an input of size of 200 vector embeddings, passes it through two linear layers with 128 and 64 units respectively, applies batch normalization and Tanh activation function, it also includes a dropout layer with a probability of 0.3 for regularization, and finally outputs a tensor of size output_size after applying another Tanh activation function.

class CosfireNet(nn.Module):
    def __init__(self, input_size, bitsize, l1_reg, l2_reg):
        super(CosfireNet, self).__init__()
        self.l1_reg = l1_reg
        self.l2_reg = l2_reg
        self.hd = nn.Sequential(
            nn.Linear(input_size, 300),
            nn.BatchNorm1d(300),
            nn.Tanh(),
            nn.Linear(300, 200),
            nn.BatchNorm1d(200),
            nn.Tanh(),
            nn.Linear(200, bitsize),
            nn.Tanh()
        )

    def forward(self, x):        
      regularization_loss = 0.0
      for param in self.hd.parameters():
          regularization_loss += torch.sum(torch.abs(param)) * self.l1_reg  # L1 regularization
          regularization_loss += torch.sum(param ** 2) * self.l2_reg  # L2 regularization
      return self.hd(x), regularization_loss


# Data
class CosfireDataset(Dataset):
    def __init__(self, dataframe):
        self.data = torch.tensor(preprocessing.normalize(dataframe.iloc[:, :-1].values), dtype=torch.float32)
        self.labels = torch.tensor(dataframe.iloc[:, -1].values, dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

epochs = 2000
learning_rate = 0.01
alpha = 0.00001
margin = 36
batch_size = 48
bitsize = 72
l1_reg = 1e-8
l2_reg = 1e-08

#List of Grid search parameters to iterate over

# learning_rate_values = [ 0.1, 0.01]
# alphas = [1e-03,  1e-5]
# margin_values = [24, 36, 48] 
# batch_size_values = [32, 48, 64]
# bitsize_values = [16, 24, 32, 40, 48, 56]
# l2_reg_values = [0, 1e-08]
# l1_reg_values = [0, 1e-08]
                                                           
# Train Valid & Test data
train_df,valid_df,test_df = get_and_check_data_prev(data_path,data_path_valid,data_path_test,dic_labels)                                

# DataLoader for training set
train_dataset = CosfireDataset(train_df)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# DataLoader for validation set
valid_dataset = CosfireDataset(valid_df)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
                  
model = CosfireNet(input_size, bitsize, l1_reg, l2_reg)

optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.95)

# Lists to store training and validation losses
train_losses = []
train_losses_eval = []

val_losses = []
# Variables to keep track of the best model

# Train the loop
for _ in tqdm(range(epochs), desc='Training Progress', leave=True):
      model.train()
      total_train_loss = 0.0
      for _, (train_inputs, labels) in enumerate(train_dataloader):
         optimizer.zero_grad()
         train_outputs, reg_loss = model(train_inputs)
         loss = DSHLoss(u = train_outputs, y=labels, alpha = alpha, margin = margin) + reg_loss
         loss.backward()
         optimizer.step()
         total_train_loss += loss.item() * train_inputs.size(0)
      scheduler.step()

      # Calculate average training loss
      average_train_loss = total_train_loss / len(train_dataloader)
      train_losses.append(average_train_loss)

      # Validation loop
      model.eval()
      total_val_loss = 0.0
      with torch.no_grad():
         for val_inputs, val_labels in valid_dataloader:
            val_outputs, reg_loss = model(val_inputs)
            val_loss = DSHLoss(u = val_outputs, y=val_labels, alpha = alpha, margin = margin) + reg_loss
            total_val_loss += val_loss.item() * val_inputs.size(0)

      # Calculate average validation loss
      average_val_loss = total_val_loss / len(valid_dataloader)
      val_losses.append(average_val_loss)




plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, epochs + 1), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Curves')
plt.legend()
plt.savefig(output_dir + '/Train_valid_curves.png')
plt.close()

#########################################################################
#################     Evaluate                                                  
##########################################################################

#model.eval()
valid_dataset_eval = CosfireDataset(valid_df)
valid_dataloader_eval = DataLoader(valid_dataset_eval, batch_size=batch_size, shuffle=False)



train_dataset_eval = CosfireDataset(train_df)
train_dataloader_eval = DataLoader(train_dataset_eval, batch_size=batch_size, shuffle=False)


test_dataset = CosfireDataset(test_df)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Lists to store predictions
predictions = []

# Perform predictions on the train set
with torch.no_grad():
      for train_inputs, _ in tqdm(train_dataloader_eval, desc='Predicting', leave=True):
         train_outputs,_ = model(train_inputs)
         predictions.append(train_outputs.numpy())

# Flatten the predictions
flat_predictions_train = [item for sublist in predictions for item in sublist]

# Append predictions to the df_train DataFrame
train_df['predictions'] = flat_predictions_train
train_df['label_name'] = train_df['label_code'].map(dic_labels_rev)
train_df.to_csv(output_dir +'/train_df.csv',index = False)

#################################################################

predictions = []
# Perform predictions on the valid set
with torch.no_grad():
      for valid_inputs, _ in tqdm(valid_dataloader_eval, desc='Predicting', leave=True):
         valid_outputs,_ = model(valid_inputs)
         predictions.append(valid_outputs.numpy())
# Flatten the predictions
flat_predictions_valid = [item for sublist in predictions for item in sublist]
# Append predictions to the valid_df DataFrame
valid_df['predictions'] = flat_predictions_valid
valid_df['label_name'] = valid_df['label_code'].map(dic_labels_rev)
valid_df.to_csv(output_dir +'/valid_df.csv',index = False)


#########################################################################
##Testing
#########################################################################
#Perform predictions on the testing set

predictions_test = []
with torch.no_grad():
      for test_inputs, _ in tqdm(test_dataloader, desc='Predicting', leave=True):
         test_outputs,_ = model(test_inputs)
         predictions_test.append(test_outputs.numpy())

# Flatten the predictions
flat_predictions_test = [item for sublist in predictions_test for item in sublist]

# Append predictions to the test_df DataFrame
test_df['predictions'] = flat_predictions_test
test_df['label_name'] = test_df['label_code'].map(dic_labels_rev)
test_df.to_csv(output_dir +'/test_df.csv',index = False)

Binarization and Mean Average Precision (mAP)

Mean average precision (mAP) is a commonly used evaluation metric in image retrieval tasks. It measures the average precision across all queries in the dataset. Precision is defined as the number of relevant images retrieved divided by the total number of images retrieved.

The formula for MAP is given as:

\[\begin{equation} \text{AP} = \frac{1}{\text{GTP} }\sum_{i=1}^{n}\text{Precision}(i)\times\text{Rel}(i), \end{equation}\]

\[\begin{equation} \text{mAP} = \frac{1}{N_q }\sum_{j=1}^{N_q}AP_j, \end{equation}\]

where AP represents the average precision of one query, with \(n\) being the total number of reference images, and \(\text{GTP}\) the total number of ground truth positives, \(\text{Precision}(i)\) is the precision of the top \(i\) ranked reference images and \(\text{Rel}(i)\) is an indicator variable that is 1 if the \(i\)th image is relevant and 0 otherwise. Finally, the mAP is computed as the average of all AP values obtained for all \(N_q\) query images.

Tip
  • In image retrieval, a query is typically an image, and the task is to retrieve a set of relevant images from a large dataset. The mAP metric is used to evaluate the performance of the retrieval system by comparing the retrieved images to a set of ground-truth relevant images for each query.

  • mAP takes into account both the relevance and the ranking of the retrieved images. A high mAP score indicates that the retrieval system is able to retrieve a high proportion of relevant images, and that these images are ranked highly in the retrieved set.

Code
thresholds_abs_values = np.arange(-1, 1.2, 0.1)
mAP_results_valid = []
mAP_results_test = []
for _,thresh in enumerate(thresholds_abs_values):
      
      mAP_valid_thresh,_,_, _, _, _,_ = mAP_values(train_df,valid_df,thresh = thresh, percentile = False)
      mAP_test_thresh,_,_, _, _, _,_ = mAP_values(train_df, test_df,thresh = thresh, percentile = False)

      mAP_results_valid.append(mAP_valid_thresh)
      mAP_results_test.append(mAP_test_thresh)

      #topkmap, cum_prec, cum_recall, cum_map = MapWithPR_values(train_df,valid_df,thresh = thresh, percentile = False)
      

# Plotting
data_abs_values = {'mAP_valid': mAP_results_valid,
         'mAP_test': mAP_results_test,
         'threshold': thresholds_abs_values}

df_thresh_abs_values = pd.DataFrame(data_abs_values)
df_thresh_abs_values.to_csv(output_dir +'/results_data_abs_values.csv',index = False)

# Find the index of the maximum mAP value
max_map_index = df_thresh_abs_values['mAP_valid'].idxmax()

# Retrieve the threshold corresponding to the maximum mAP
threshold_max_map_abs_values = df_thresh_abs_values.loc[max_map_index, 'threshold']
#%%
mAP_valid_abs_values,mAP_std,mAP_values1, r_binary, train_label, q_binary, valid_label = mAP_values(train_df,valid_df,thresh = threshold_max_map_abs_values, percentile = False)


fig, ax = plt.subplots(figsize=(10/3, 3))

plt.plot(thresholds_abs_values, mAP_results_valid,color='#ff7f0eff')
plt.scatter(threshold_max_map_abs_values, mAP_valid_abs_values, color='#d62728ff', marker='o', s=25)
plt.xlabel('Threshold')
plt.ylabel('mAP')

#plt.rc('font', family='Nimbus Roman')

plt.savefig(output_dir + '/Maps_curves_abs_values.svg',format='svg', dpi=1200)
plt.show()


print('The optimal threshold from validation data is: ', np.round(threshold_max_map_abs_values,2))
print('The Best Validation mAP is: ',np.round(mAP_valid_abs_values,2))
The optimal threshold from validation data is:  -0.6
The Best Validation mAP is:  88.48

Now applying the same model (best model from the validation data) to the test data with the best threshold as per the validation data.

Code
mAP_test_abs_values,mAP_std,mAP_values1, train_binary, train_label, test_binary, valid_label = mAP_values(train_df, test_df,thresh = threshold_max_map_abs_values, percentile = False)


topkmap, cum_prec, cum_recall, cum_map = MapWithPR_values(train_df,valid_df,thresh = threshold_max_map_abs_values, percentile = False)

print('At the optimal threshold: ', np.round(threshold_max_map_abs_values,2))
print('The Test mAP is: ',np.round(mAP_test_abs_values,2))
At the optimal threshold:  -0.6
The Test mAP is:  90.7

Model Predictions overview

Exploring the quality of the model predictions by visualizing the distribution of the predicted values for each class in the test set. This can help identify any potential biases or patterns in the predictions that may be worth investigating further.

Code
dic_labels_rev = { 2:'Bent',
                3:'Compact',
                  0:'FRI',
                  1: 'FRII'
              }

train_df['labels'] = train_df['label_code'].map(dic_labels_rev)
test_df['labels'] = test_df['label_code'].map(dic_labels_rev)
# Create a figure and axis
fig, ax = plt.subplots(figsize=(8, 6))

# Iterate over label_code values
for _,label_code in enumerate(list(train_df['label_name'].value_counts().index)):
    dff = test_df.query(f'label_code == label_code')
    out_array_train = []
    dd_train = np.array([out_array_train.extend(np.array(out)) for _, out in enumerate(dff.predictions)])
    out_array_train = np.array(out_array_train)
    
    # Plot the KDE curve with a hue
    sns.kdeplot(out_array_train, label=label_code, ax=ax)

# Set the x-axis limits
ax.set_xlim(-1, 1)

# Customize the plot
ax.set_xlabel('Value')
ax.set_ylabel('Density')
ax.grid(True)
ax.legend()

# Display the plot
plt.show()
plt.close()

# df = pd.DataFrame([test_df.predictions[0]])
# df.columns = ['hash_'+str(i) for i in range(36)]
# df['label'] = test_df.labels[0]
# for j in range(1,400,20):
#    df2 = pd.DataFrame([test_df.predictions[j]])
#    df2.columns = ['hash_'+str(i) for i in range(36)]
#    df2['label'] = test_df.labels[j]
#    df = pd.concat([df,df2])

# display(Markdown(df.to_markdown(index = True)))
Code
# Create a figure and axis
fig, ax = plt.subplots(figsize=(8, 6))

# Iterate over label_code values
for _,label_code in enumerate(list(train_df['label_name'].value_counts().index)):
    dff = train_df.query(f'label_code == label_code')
    out_array_train = []
    dd_train = np.array([out_array_train.extend(np.array(out)) for _, out in enumerate(dff.predictions)])
    out_array_train = np.array(out_array_train)
    
    # Plot the KDE curve with a hue
    sns.kdeplot(out_array_train, label=label_code, ax=ax)

# Set the x-axis limits
ax.set_xlim(-1, 1)

# Customize the plot
ax.set_xlabel('Value')
ax.set_ylabel('Density')
ax.grid(True)
ax.legend()

# Display the plot
plt.show()
plt.close()


# df = pd.DataFrame([train_df.predictions[0]])
# df.columns = ['hash_'+str(i) for i in range(36)]
# df['label'] = train_df.labels[0]
# for j in range(1,1180,50):
#    df2 = pd.DataFrame([train_df.predictions[j]])
#    df2.columns = ['hash_'+str(i) for i in range(36)]
#    df2['label'] = train_df.labels[j]
#    df = pd.concat([df,df2])

# display(Markdown(df.to_markdown(index = True)))

Code
# %%
array_dat = []
for i in range(test_df['predictions'].shape[0]):
  array_dat.append(list(test_df['predictions'].iloc[i]))

array_dat = np.array(array_dat)
array_dat.shape

y = test_df.labels
tsne = TSNE(n_components=2, random_state=42)
z = tsne.fit_transform(array_dat)
df = pd.DataFrame()
df["y"] = y
df["comp-1"] = z[:,0]
df["comp-2"] = z[:,1]

sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 4),
                data=df).set(title="Model last layer T-SNE projection")

plt.show()
plt.close()

Code
# %%
array_dat = []
for i in range(train_df['predictions'].shape[0]):
  array_dat.append(list(train_df['predictions'].iloc[i]))

array_dat = np.array(array_dat)
y = train_df.labels
tsne = TSNE(n_components=2, random_state=42)
z = tsne.fit_transform(array_dat)
df = pd.DataFrame()
df["y"] = y
df["comp-1"] = z[:,0]
df["comp-2"] = z[:,1]

sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 4),
                data=df).set(title="Model last layer T-SNE projection")


plt.show()
plt.close()

Code
array_dat = []
for i in range(train_df['predictions'].shape[0]):
  array_dat.append(list(train_df['predictions'].iloc[i]))

for i in range(test_df['predictions'].shape[0]):
  array_dat.append(list(test_df['predictions'].iloc[i]))
array_dat = np.array(array_dat)


y = np.array(pd.concat([train_df.labels,test_df.labels]))
tsne = TSNE(n_components=2, random_state=42)
z = tsne.fit_transform(array_dat)
df = pd.DataFrame()
df["y"] = y
df["comp-1"] = z[:,0]
df["comp-2"] = z[:,1]

sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 4),
                data=df).set(title="Model last layer T-SNE projection")
plt.show()
plt.close()

View the image retrieval - COSFIRE Approach

Code
def add_data_paths(train_paths = 'data/train/*/*', dic_labels_rev = dic_labels_rev):
   df_labels_train_paths = pd.DataFrame()
   df_labels_train_paths['paths'] = glob.glob(train_paths)
   df_labels_train_paths['label'] = df_labels_train_paths['paths'].apply(lambda x: x.split(os.path.sep)[1] )
   df_labels_train_paths['label_code'] = df_labels_train_paths['label'].map(dic_labels_rev)
   df_labels_train_paths = df_labels_train_paths.sort_values('label_code')
   df_labels_train_paths = df_labels_train_paths.reset_index()[['paths', 'label', 'label_code']]
   return df_labels_train_paths

df_labels_train_paths = add_data_paths(train_paths = 'data/train/*/*', dic_labels_rev = dic_labels_rev)
df_labels_test_paths = add_data_paths(train_paths = 'data/test/*/*', dic_labels_rev = dic_labels_rev)

def perf_percentages(input_data):
    unique, counts = np.unique(input_data, return_counts=True)
    df = pd.DataFrame()
    df['unique'] = unique
    df['counts'] = counts
    df['Percentage'] = np.round(counts / counts.sum() * 100)
    return df

    
def query_image(test_image_index = 190, 
               test_images_paths = df_labels_test_paths,
               train_images_db_paths = df_labels_train_paths,
               train_images_db = train_binary,
               test_binary = test_binary):

         
    print('Test Image is: ', test_images_paths.label[test_image_index])
    fig = plt.figure(figsize=(3, 3))
    image_test = Image.open(test_images_paths.paths[test_image_index])
    image_test = torch.from_numpy(np.array(image_test))
    plt.imshow(image_test[:, :, 1], cmap='viridis')
    plt.axis('off')
    plt.show()

    test_image = test_binary[test_image_index]  
    #np.count_nonzero(np.array([0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1,
      # 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0])==np.array([0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1,
      # 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0]))
    similarity_distance = np.count_nonzero(test_image != train_images_db, axis=1)
    sort_indices = np.argsort(similarity_distance)
    top_indices = sort_indices[:100]
    #print(top_indices)
    paths_to_imgs = [train_images_db_paths.paths[index] for _,index in enumerate(top_indices)]
    df = perf_percentages([train_images_db_paths.label[index] for index in top_indices])
    print(df)
    cols = 7
    rows = 4

    fig = plt.figure(figsize=(2 * cols, 2 * rows))
    for col in range(cols):
        for i, img_path in enumerate(paths_to_imgs[:cols*rows]):
            ax = fig.add_subplot(rows, cols, i + 1)
            ax.grid(visible=False)
            ax.axis("off")
            image = Image.open(img_path)
            image = torch.from_numpy(np.array(image))
            ax.imshow(image[:, :, 1], cmap='viridis')
            ax.set_title(img_path.split(os.path.sep)[1])

    plt.show()


Code
query_image(test_image_index = 1)
Test Image is:  Bent

  unique  counts  Percentage
0   Bent     100       100.0


Code
query_image(test_image_index = random.randint(101, 202))
Test Image is:  Compact

    unique  counts  Percentage
0     Bent      70        70.0
1  Compact      29        29.0
2      FRI       1         1.0


Code
query_image(test_image_index = random.randint(205, 300))
Test Image is:  FRI

    unique  counts  Percentage
0  Compact       1         1.0
1      FRI      44        44.0
2     FRII      55        55.0


Code
query_image(test_image_index = random.randint(310, 400))
Test Image is:  FRII

  unique  counts  Percentage
0   FRII     100       100.0


Further Analyses

Model comparisons with performance of DenseNet-161 model (Ndung’u et al. 2023). A similar study on image retrieval whic serves as a suitable reference for comparison, given that the data set utilized in the the paper is identical to the one used in this project.

Code
label = 'mean_std'

topk_number_images = list(range(10,1180,50)) + [1180]
mAP_topk = []
mAP_topk_std = []
map_values_list = []
for _, topk in enumerate(topk_number_images):
    maP,mAP_std,map_values, _, _, _,_= mAP_values(train_df, test_df,thresh = threshold_max_map_abs_values, percentile = False,topk=topk)
    mAP_topk.append(maP)
    mAP_topk_std.append(mAP_std)
    map_values_list.append(map_values)

data = {'topk_number_images': topk_number_images,
        'mAP': mAP_topk,
        'mAP_std': mAP_topk_std}
df_cosfire = pd.DataFrame(data)
df_cosfire.to_csv('mAP_vs_topk_images_72bit_cosfire_{label}.csv', index = False)

# sns.lineplot(data=df_cosfire, x='topk_number_images', y='mAP',label='Cosfire',  color='blue')
# plt.fill_between(df_cosfire['topk_number_images'], df_cosfire['mAP'] - df_cosfire['mAP_std'], df_cosfire['mAP'] + df_cosfire['mAP_std'], color='#439CEF')
# plt.xlabel('The number of retrieved samples')
# plt.ylabel('mAP')
# plt.ylim(0, 110)
# plt.savefig( f'map_vs_topk_number_images_cosfire_{label}.png')
# plt.savefig( f'map_vs_topk_number_images_cosfire_{label}.svg',format='svg', dpi=1200)
# plt.show()



train_df_dn = pd.read_csv(r"C:\Users\P307791\Documents\deep_hashing_github_v2\df_training_tl.csv")
test_df_dn = pd.read_csv(r"C:\Users\P307791\Documents\deep_hashing_github_v2\df_testing_tl.csv")

predictions_test = []
predictions_train = []

for i in range(train_df_dn.shape[0]):
    predictions_train.append(np.array(ast.literal_eval(train_df_dn.predictions[i])))

for i in range(test_df_dn.shape[0]):
    predictions_test.append(np.array(ast.literal_eval(test_df_dn.predictions[i])))

train_df_dn['predictions'] = predictions_train
test_df_dn['predictions'] = predictions_test

threshold_max_map_abs_values = -0.9
topk=100
mAP_dn,mAP_std_dn,mAP_values_dn, r_binary_dn, train_label_dn, q_binary_dn, valid_label_dn = mAP_values(train_df_dn, test_df_dn,thresh = threshold_max_map_abs_values, percentile = False,topk=topk)
#print('mAP top 100  for DenseNet is: ',np.round(mAP_dn,2))

topk_number_images_dn = list(range(10,1180,50)) + [1180]
mAP_topk_dn = []
mAP_topk_std_dn = []
map_values_list = []
for _, topk in enumerate(topk_number_images_dn):
    maP_dn,mAP_std_dn,map_values, _, _, _, _= mAP_values(train_df_dn, test_df_dn,thresh = threshold_max_map_abs_values, percentile = False,topk=topk)
    mAP_topk_dn.append(maP_dn)
    mAP_topk_std_dn.append(mAP_std_dn)
    map_values_list.append(map_values)
    

data_densenet = {'topk_number_images': topk_number_images_dn,
        'mAP': mAP_topk_dn,
        'mAP_std': mAP_topk_std_dn}
df_densenet = pd.DataFrame(data_densenet)
df_densenet.to_csv(f'mAP_vs_topk_images_72bit_densenet_{label}.csv', index = False)

# # Plot the line curve
# sns.lineplot(data=df_densenet, x='topk_number_images', y='mAP',label='DenseNet',  color='blue')
# plt.fill_between(df_densenet['topk_number_images'], df_densenet['mAP'] - df_densenet['mAP_std'], df_densenet['mAP'] + df_densenet['mAP_std'], color='#439CEF')
# plt.xlabel('The number of retrieved samples')
# plt.ylabel('mAP')
# plt.ylim(0, 103)
# plt.savefig( f'map_vs_topk_number_images_densenet_{label}.png')
# plt.savefig( f'map_vs_topk_number_images_densenet_{label}.svg',format='svg', dpi=1200)
# plt.show()

sns.lineplot(data=df_cosfire, x='topk_number_images', y='mAP',label='Cosfire',  color='red')
# plt.fill_between(df_cosfire['topk_number_images'], df_cosfire['mAP'] - df_cosfire['mAP_std'], df_cosfire['mAP'] + df_cosfire['mAP_std'], color='#439CEF')
sns.lineplot(data=df_densenet, x='topk_number_images', y='mAP',label='DenseNet',  color='blue')
# plt.fill_between(df_densenet['topk_number_images'], df_densenet['mAP'] - df_densenet['mAP_std'], df_densenet['mAP'] + df_densenet['mAP_std'], color='#ffbaba')

plt.xlabel('The number of retrieved samples')
plt.ylabel('mAP')
plt.ylim(0, 95)
plt.savefig( f'map_vs_topk_number_images_densenet_{label}_cbd.png')
plt.savefig( f'map_vs_topk_number_images_densenet_{label}_cbd.svg',format='svg', dpi=1200)
plt.show()

Model comparisons while using different bit sizes. Derived from separate analyses.

Code
# df_top_mAP = df.sort_values(by="mAP", ascending=False).head(10)
# display(Markdown(df_top_mAP.to_markdown(index = True)))
data = {'bit': ['8_bit', '16_bit', '24_bit', '32_bit', '36_bit', '48_bit', '64_bit', '72_bit'],
        'mAP': [60.68, 60.12, 76.12, 91.76,93.06, 91.97, 91.41,90.77]}
df = pd.DataFrame(data)

# Create the bar plot
plt.figure(figsize=(10, 6))
plt.bar(df['bit'], df['mAP'])
#plt.ylim(50,100)

# Add value labels to the bars
for i, v in enumerate(df['mAP']):
    if df['bit'][i] in ['8_bit', '16_bit', '24_bit']:
        color = 'red'
    else:
        color = 'black'
    plt.text(i, v + 0.5, str(v), ha='center', color=color)


# Set the plot title and labels
plt.title('mAP Values for Different Bit sizes')
plt.xlabel('Bit Size')
plt.ylabel('mAP')

# Display the plot
plt.show()

References

Azzopardi, George, and Nicolai Petkov. 2012. “Trainable COSFIRE Filters for Keypoint Detection and Pattern Recognition.” IEEE Transactions on Pattern Analysis and Machine Intelligence 35 (2): 490–503.
Ndung’u, Steven, Trienko Grobler, Stefan J. Wijnholds, Dimka Karastoyanova, and George Azzopardi. 2023. “Deep Supervised Hashing for Fast Retrieval of Radio Image Cubes.” In 2023 XXXVth General Assembly and Scientific Symposium of the International Union of Radio Science (URSI GASS), 1–4. https://doi.org/10.23919/URSIGASS57860.2023.10265687.
Ndung’u, Steven, Trienko Grobler, Stefan J Wijnholds, Dimka Karastoyanova, and George Azzopardi. 2024. Classification of radio galaxies with trainable COSFIRE filters.” Monthly Notices of the Royal Astronomical Society 530 (1): 783–94. https://doi.org/10.1093/mnras/stae821.